# coding=utf-8
import datetime
import pandas as pd
import numpy as np
import statsmodels.api as sm
import pmdarima as pm
import matplotlib.pyplot as plt
import statsmodels.tsa.stattools as st
from dateutil.relativedelta import relativedelta
from statsmodels.tsa.arima_model import ARMA, ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.stats.outliers_influence import variance_inflation_factor


def stable_test(df, nlags=10):
    '''
    时间序列平稳性检验。如果单位根检验的P值小于0.05，则认为时间序列是平稳的；否则，为非平稳时间序列

    Args:
        parm: a dataframe like object which contains variables.
    
    Returns:
        A dict like object which contains the test results.
    '''
    col = df.columns.tolist()
    ret = list()
    for i in col:
        adf_ret = sm.tsa.stattools.adfuller(df[i], autolag='AIC')
        adf_output = pd.Series(adf_ret[0:4],index=['Test Statistic',
                                'p-value','#Lags Used','Number of Observations Used']).to_dict()
        adf_output.update({'variable':i})
        ret.append(adf_output)
    return ret


def whitenoise_test(df, lags=1):
    '''
    Ljung-Box test: 若p值小于0.05，拒绝原假设，认为序列为非白噪声时间序列, 存在序列相关性; 否则，接受原假设，认为序列不相关，为白噪声序列。
    Args:
        parm: a dataframe like object which contains y.
    
    Returns:
        A dict like object which represent the test result，白噪声检验的统计量.
    '''
    col = df.columns.to_list()
    ret = list()
    for i in col:
        lb_stat, p_val = acorr_ljungbox(df[i], lags=lags, boxpierce=False)
        ret.append({'variable':i,'lb_stat':lb_stat[0], 'p_value':p_val[0]})
    return ret


def decompose(df, model='additive'):
    '''
    将时间序列分解成trend（趋势项）、seasonal（季节项）、residual（残差项）
    Args:
        parm: 时间序列数据

    Return:
        trend: 趋势项
        seasonal: 季节项
        residual: 残差项
    '''
    decomp = seasonal_decompose(df, model=model)
    trend = decomp.trend
    seasonal = decomp.seasonal
    residual = decomp.resid
    plt.subplot(411)
    plt.plot(df, label='Original')
    plt.legend(loc='best')
    plt.subplot(412)
    plt.plot(trend, label='Trend')
    plt.legend(loc='best')
    plt.subplot(413)
    plt.plot(seasonal,label='Seasonality')
    plt.legend(loc='best')
    plt.subplot(414)
    plt.plot(residual, label='Residuals')
    plt.legend(loc='best')
    plt.tight_layout()
    plt.show()
    return trend, seasonal, residual


def autocorr_test(data, qstat=True):
    '''
    自相关性检验。

    Args:
        data: a dataframe like object which contains y.

    Returns:
        自相关检验的检验结果。
    '''
    # 自相关性检验表
    acf, q_stat, p_val = sm.tsa.acf(data, nlags=len(data), qstat=qstat)
    df = np.c_[range(1, len(data)), acf[1:], q_stat, p_val]
    table = pd.DataFrame(df, columns=['lag', "AC", "Q_Stat", "Prob(>Q)"])
    table.set_index('lag', inplace=True)
    print(f'The autocorrelation result of is:\n', table)
    return table


def autocorr_plot(data):
    '''
    画自相关和偏自相关图，自相关系数和偏自相关系数越大，说明过去对现在的影响越大
    Args:
        data: A dataframe like object.
    '''
    %matplotlib inline
    # 自相关性检验图和偏自相关图
    fig1 = plt.figure(figsize=(12,8))
    ax1 = fig1.add_subplot(111)
    fig1 = sm.graphics.tsa.plot_acf(data, lags=len(data)-1, ax=ax1)
    
    # 偏自相关图检验
    fig2 = plt.figure(figsize=(12,8))
    ax2 = fig2.add_subplot(111)
    fig2 = sm.graphics.tsa.plot_pacf(data, lags=int(len(data)/2)-1, ax=ax2)


def arma_model(df, p, q, auto_order=False, method='mle',trend='c', n_periods=2, lags=1, alpha=0.05, return_df=True):
    '''
    本方法为ARMA系列预测模型，共包含3类，即AR模型、MA模型、ARMA模型，具体的模型形式，需要用户自行判断。
    Args:
        parm: A DataFrame like object which contains y.
        p: 模型阶数
        q: 模型阶数
        auto_order:是否自动确定AMRA模型的阶数. True表示根据AIC最小准则自动确定模型阶数, False表示不进行自动定阶.
        method: 参数估计方法，'mle' default.
        st.arma_order_select_ic: ARMA模型自动定阶.    
    Returns:
        模型返回的是预测结果，中间还会输出对应的统计信息。
    '''
    if auto_order == False:
        model = ARMA(df, order=(p,q)).fit(method=method, trend=trend)
        print(model.summary())
        # 模型残差的自相关性检验
        print('The Ljung-Box test result is:\n', acorr_ljungbox(model.resid.values, lags=lags, return_df=return_df))
        print('The auto-correlation test result is:\n', autocorr_test(model.resid.values))
        print('The Durbin-Watson test result is:', sm.stats.durbin_watson(model.resid.values))
        # 样本内预测，输出点预测值和区间预测值
        in_pred_val = model.predict()
        fcerr = abs(in_pred_val.values.squeeze() - df.values.squeeze())
        in_pred_int = model._forecast_conf_int(in_pred_val, fcerr, alpha=alpha)
        in_low_bound, in_up_bound = in_pred_int.T.tolist()
        in_pred = pd.DataFrame({'date':df.index.values,
                                'predict value':in_pred_val.tolist(),
                                'low bound':in_low_bound,
                                'up bound':in_up_bound})
        in_pred.set_index('date', inplace=True)
        # 样本外预测，输出点预测值和区间预测值
        pred_val, stderr, pred_intval = model.forecast(steps=n_periods, alpha=alpha)
        out_low_bound, out_up_bound = pred_intval.T.tolist()
        last_date = in_pred_val.index[-1]
        fut_date = []
        for i in range(n_periods):
            future_month = last_date + relativedelta(months=(i+1) * 3)
            fut_date.append(future_month.strftime("%Y-%m-%d"))
        out_pred = pd.DataFrame({'date':fut_date,
                                 'predict value':pred_val.tolist(),
                                 'low bound':out_low_bound,
                                 'up bound':out_up_bound})
        out_pred.set_index('date', inplace=True)
        return in_pred, out_pred
    elif auto_order == True:
        best_aic = np.inf
        best_order = None
        best_model = None
        #一般阶数不超过整体数据的十分之一。
        pq_rng = range(10) # [0,1,2,3,4]
        for i in pq_rng:
            for j in pq_rng:
                try:
                    tmp_mdl = ARMA(x1, order=(i,j)).fit(method='mle', trend='nc')
                    tmp_aic = tmp_mdl.aic
                    if tmp_aic < best_aic:
                        best_aic = tmp_aic
                        best_order = (i, j)
                        best_model = tmp_mdl
                except: continue
        print('aic: {:6.5f} | order: {}'.format(best_aic, best_order))
        print(best_model.summary())
        
        ## 模型残差的自相关性检验
        print('The Ljung-Box test result is:\n', acorr_ljungbox(best_model.resid.values, lags=lags, return_df=return_df))
        print('The auto-correlation test result is:\n', autocorr_test(best_model.resid.values))
        print('The Durbin-Watson test result is:', sm.stats.durbin_watson(best_model.resid.values))
        # 样本内预测
        in_pred_val = best_model.predict()
        fcerr = abs(in_pred_val.values.squeeze() - df.values.squeeze())
        in_pred_int = model._forecast_conf_int(in_pred_val, fcerr, alpha=alpha)
        in_low_bound, in_up_bound = in_pred_int.T.tolist()
        in_pred = pd.DataFrame({'date':df.index.values,
                                'predict value':in_pred_val.tolist(),
                                'low bound':in_low_bound,
                                'up bound':in_up_bound})
        in_pred.set_index('date', inplace=True)

        # 样本外预测
        pred_val, stderr, pred_intval = best_model.forecast(steps=n_periods, alpha=alpha)
        out_low_bound, out_up_bound = pred_intval.T.tolist()
        last_date = in_pred_val.index[-1]
        fut_date = []
        for i in range(n_periods):
            future_month = last_date + relativedelta(months=(i+1) * 3)
            fut_date.append(future_month.strftime("%Y-%m-%d"))
        out_pred = pd.DataFrame({'date':fut_date,
                                 'predict value':pred_val.tolist(),
                                 'low bound':out_low_bound,
                                 'up bound':out_up_bound})
        out_pred.set_index('date', inplace=True)
        return in_pred, out_pred


def arima_model(df, p, d, q, auto_order=False, lags=1, n_periods=2, alpha=0.05, return_df=True):
    '''
    本方法为ARIMA模型的Python方法
    Args:
        parm: A DataFrame like object which contains y.
    
    Returns:
        
    '''
    if auto_order == False:
        model = ARIMA(df, order=(p, d, q)).fit(disp=0)
        print(model.summary())
        # 模型残差的自相关性检验
        print('The Ljung-Box test result is:\n', acorr_ljungbox(model.resid.values, lags=lags, return_df=return_df))
        print('The auto-correlation test result is:\n', autocorr_test(model.resid.values))
        print('The Durbin-Watson test result is:', sm.stats.durbin_watson(model.resid.values))
        # 样本内预测
        in_pred_val = model.predict()
        fcerr = abs(in_pred_val.values.squeeze() - df.iloc[d:].values.squeeze())
        in_pred_int = model._forecast_conf_int(in_pred_val, fcerr, alpha=alpha)
        in_low_bound, in_up_bound = in_pred_int.T.tolist()
        in_pred = pd.DataFrame({'date':[x.strftime('%F') for x in df.iloc[d:].index],
                                'predict value':in_pred_val.tolist(),
                                'low bound':in_low_bound,
                                'up bound':in_up_bound})
        in_pred.set_index('date', inplace=True)

        # 样本外预测
        pred_val, stderr, pred_intval = model.forecast(steps=n_periods, alpha=alpha)
        out_low_bound, out_up_bound = pred_intval.T.tolist()
        last_date = in_pred_val.index[-1]
        fut_date = []
        for i in range(n_periods):
            future_month = last_date + relativedelta(months=(i+1) * 3)
            fut_date.append(future_month.strftime("%Y-%m-%d"))
        out_pred = pd.DataFrame({'date':fut_date,
                                 'predict value':pred_val.tolist(),
                                 'low bound':out_low_bound,
                                 'up bound':out_up_bound})
        out_pred.set_index('date', inplace=True)
        return in_pred, out_pred
    elif auto_order == True:
        model = pm.auto_arima(df.value, start_p=1, start_q=1,
                      information_criterion='aic',
                      test='adf',       # use adftest to find optimal 'd'
                      max_p=3, max_q=3, # maximum p and q
                      m=1,              # frequency of series
                      d=None,           # let model determine 'd'
                      seasonal=False,   # No Seasonality
                      start_P=0, 
                      D=0, 
                      trace=True,
                      error_action='ignore',  
                      suppress_warnings=True, 
                      stepwise=True)
        print(model.summary())
        # 模型残差的自相关性检验
        print('The Ljung-Box test result is:\n', acorr_ljungbox(model.resid.values, lags=lags, return_df=return_df))
        print('The auto-correlation test result is:\n', autocorr_test(model.resid.values))
        print('The Durbin-Watson test result is:', sm.stats.durbin_watson(model.resid.values))
        # 样本内预测
        in_pred_val = model.predict()
        fcerr = abs(in_pred_val.values.squeeze() - df.iloc[1:].values.squeeze())
        in_pred_int = model._forecast_conf_int(in_pred_val, fcerr, alpha=alpha)
        in_low_bound, in_up_bound = in_pred_int.T.tolist()
        in_pred = pd.DataFrame({'date':[x.strftime('%F') for x in parm_diff.iloc[1:].index],
                                'predict value':in_pred_val.tolist(),
                                'low bound':in_low_bound,
                                'up bound':in_up_bound})
        in_pred.set_index('date', inplace=True)

        # 样本外预测
        pred_val, stderr, pred_intval = model.forecast(steps=n_periods, alpha=alpha)
        out_low_bound, out_up_bound = pred_intval.T.tolist()
        last_date = in_pred_val.index[-1]
        fut_date = []
        for i in range(n_periods):
            future_month = last_date + relativedelta(months=(i+1) * 3)
            fut_date.append(future_month.strftime("%Y-%m-%d"))
        out_pred = pd.DataFrame({'date':fut_date,
                                 'predict value':pred_val.tolist(),
                                 'low bound':out_low_bound,
                                 'up bound':out_up_bound})
        out_pred.set_index('date', inplace=True)
        return in_pred, out_pred

def main(df):
    # Step1: 查看原数据的时序图
    print('原始数据的时序图为:\n', df.plot())

    # Step2: 时间序列分解
    trend, seasonal, residual = decompose(df)

    # Step3: 数据平滑
    parm = df.rolling(window=4).mean()
    parm.dropna(inplace=True)

    # Step4: 对源数据(平滑之后的数据)进行平稳性检验，并根据检验结果对源数据进行处理
    stat_ret = stable_test(parm)
    flag = None
    if stat_ret[0]['p-value'] >= 0.01:
        print('该数据为非平稳时间序列')
        parm_log = np.log(parm)
        parm_stat = stable_test(parm_log)
        print('该数据集已经采用对数处理')
        n = 0
        flag = 'log'
        while parm_stat[0]['p-value'] >= 0.01:
            parm_diff = parm_log.diff(1)
            parm_diff.dropna(inplace=True)
            parm_stat = stable_test(parm_diff)
            n += 1
            if n == 3:
                raise ValueError('由于数据取对数，做2阶差分后仍不平稳，故此数据集不适用于时间序列模型')
            else:
                flag = f'diff{n}'
                print(f'该数据已经进过{n}阶差分处理')
    els e:
        print('原始数据为平稳时间序列')
        ret2 = whitenoise_test(parm)
        if ret2[0]['p_value'] < 0.05:
            print('该序列为非白噪声序列')
            flag = 'stable'
        else:
            print('该序列为白噪声序列')
            flag = 'white-noise'

    # Step5: 序列自相关性检验，输出自相关图和偏自相关图
    if flag == 'stable':
        auto_ret = autocorr_test(parm)
        autocorr_plot(parm)
    elif flag == 'flag':
        auto_ret = autocorr_test(parm_log)
        autocorr_plot(parm_log)
    elif flag in ('diff1','diff2'):
        auto_ret = autocorr_test(parm_diff)
        autocorr_plot(parm_diff)
    else:
        print('由于源数据为白噪声序列，故对白噪声序列的研究没有意义')

    # Step6: 建立时间序列模型
    if flag == 'stable':
        in_pred, out_pred = arma_model(parm, 1, 0)
    elif flag == 'log':
        in_pred, out_pred = arma_model(parm_log, 1, 0)
    elif flag == 'diff1':
        in_pred, out_pred = arima_model(parm_diff, 1,1,1)
    elif flag == 'diff2':
        in_pred, out_pred = arima_model(parm_diff, 1,2,1)
    else:
        print('由于源数据为白噪声序列，故对白噪声序列的研究没有意义')

    # Step7: 预测结果还原
    if flag == 'stable':
        return in_pred, out_pred
    elif flag == 'log':
        inverse_in_pred = np.exp(in_pred)
        inverse_out_pred = np.exp(out_pred)
        return inverse_in_pred, inverse_out_pred
    elif flag == 'diff1':
        inverse_in_pred = np.exp(in_pred + parm_log.shift(1).iloc[2:].values)
        inverse_out_pred = np.exp(out_pred + parm_log.iloc[-1].values[0])
        return inverse_in_pred, inverse_out_pred
    elif flag == 'diff2':
        parm_diff1 = parm_log.diff(1).dropna().iloc[2:].shift(1).dropna().values + in_pred
        parm_diff2 = parm_log.iloc[3:].shift(1).dropna().values + parm_diff1
        inverse_in_pred = np.exp(parm_diff2)
        inverse_out_pred = np.exp(out_pred + parm_log.iloc[-1].values[0])
        return inverse_in_pred, inverse_out_pred
    else:
        return


if __name__ == '__main__':
    df = pd.read_excel('/Users/lovecsc/Desktop/data.xlsx')
    df.set_index('date',inplace=True)
    main(df)
